package com.starzy.mr;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob;
import org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.File;
import java.io.IOException;
import java.util.Arrays;

/**
 * @Author: starzy https://www.cnblogs.com/starzy
 * @Description: 共同好友计算
 * 计算数据格式如下：
 *          A:B,C,D,F,E,O
 *          B:A,C,E,K
 *          C:F,A,D,I
 *          D:A,E,F,L
 *          E:B,C,D,M,L
 *          F:A,B,C,D,E,O,M
 *          G:A,C,D,E,F
 *          H:A,C,D,E,O
 *          I:A,O
 *          J:B,O
 *          K:A,C,D
 *          L:D,E,F
 *          M:E,F,G
 *          O:A,H,I,J,K
 * @Date: Created in 10:27 2020/12/15
 */
public class CommonFriendsMR {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Configuration conf = new Configuration();
        /**************************************************************
         * TODO_ZH 张明星 https://www.cnblogs.com/starzy
         * 注释：第一步执行Job任务
         */
        Job step1Job = Job.getInstance(conf);
        step1Job.setJarByClass(CommonFriendsMR.class);
        step1Job.setMapperClass(Step1CommonFriendMapper.class);
        step1Job.setReducerClass(Step1CommonFriendReducer.class);
        step1Job.setMapOutputKeyClass(Text.class);
        step1Job.setOutputValueClass(Text.class);
        step1Job.setOutputKeyClass(Text.class);
        step1Job.setOutputValueClass(Text.class);
        Path step1InputPath = new Path("D:/data/friends/input");
        Path step1OutputPath = new Path("D:/data/friends/step1_output");
        FileInputFormat.setInputPaths(step1Job,step1InputPath);
        FileOutputFormat.setOutputPath(step1Job,step1OutputPath);
        FileSystem fs = FileSystem.get(conf);
        if (fs.exists(step1OutputPath)){
            fs.delete(step1OutputPath,true);
        }
        ControlledJob step1ControlledJob = new ControlledJob(step1Job.getConfiguration());
        step1ControlledJob.setJob(step1Job);

        /**************************************************************
         * TODO_ZH 张明星 https://www.cnblogs.com/starzy
         * 注释：第二步执行Job任务
         */
        Job step2Job = Job.getInstance(conf);
        step2Job.setJarByClass(CommonFriendsMR.class);
        step2Job.setMapperClass(Step2CommonFriendMapper.class);
        step2Job.setReducerClass(Step2CommonFriendReducer.class);
        step2Job.setMapOutputKeyClass(Text.class);
        step2Job.setMapOutputValueClass(Text.class);
        step2Job.setOutputKeyClass(Text.class);
        step2Job.setOutputValueClass(Text.class);
        Path step2InputPath = new Path("D:/data/friends/step1_output");
        Path step2OutputPath = new Path("D:/data/friends/step2_output");
        FileInputFormat.setInputPaths(step2Job,step2InputPath);
        FileOutputFormat.setOutputPath(step2Job,step2OutputPath);
        if (fs.exists(step2OutputPath)){
            fs.delete(step2OutputPath,true);
        }
        ControlledJob step2ControlledJob = new ControlledJob(step2Job.getConfiguration());
        step2ControlledJob.setJob(step2Job);

        //TODO 添加第二步Job任务以来第一步任务
        step2ControlledJob.addDependingJob(step1ControlledJob);
        //TODO 通过JobCOntrol工具类组织多个MapReduce程序顺序执行
        JobControl commonFriendsJob = new JobControl("CommonFriends");
        commonFriendsJob.addJob(step1ControlledJob);
        commonFriendsJob.addJob(step2ControlledJob);
        Thread thread = new Thread(commonFriendsJob);
        thread.start();
        while (!commonFriendsJob.allFinished()){
            Thread.sleep(500);
        }

        System.exit(0);

    }

    /**************************************************************
     * TODO_ZH 张明星 https://www.cnblogs.com/starzy
     * @Description: 第一步执行map
     * @Author: starzy
     * @Date: 2020/12/15
     */
    static class Step1CommonFriendMapper extends Mapper<LongWritable, Text,Text,Text>{
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String line = value.toString();
            String[] info = line.split(":");
            String myName = info[0];
            String[]friends = info[1].split(",");
            for (String friend : friends) {
                context.write(new Text(friend),new Text(myName));
            }
        }
    }

    /**************************************************************
     * TODO_ZH 张明星 https://www.cnblogs.com/starzy
     * @Description: 第一步执行reduce
     * @Author: starzy
     * @Date: 2020/12/15
     */
    static class Step1CommonFriendReducer extends Reducer<Text,Text,Text,Text>{
        @Override
        protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
            StringBuffer stringBuffer = new StringBuffer();
            for (Text value : values) {
                stringBuffer.append(value).append("-");
            }
            context.write(key,new Text(stringBuffer.toString().substring(0,stringBuffer.toString().length()-1)));
        }
    }

    static class Step2CommonFriendMapper extends Mapper<LongWritable,Text,Text,Text>{
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String line = value.toString();
            String[] tokens = line.split("\t");
            String friend = tokens[0];
            String[] persons = tokens[1].split("-");
            Arrays.sort(persons);
            for (int i = 0; i < persons.length - 1; i++) {
                for (int j = i + 1; j < persons.length; j++) {
                    context.write(new Text(persons[i] + "-" + persons[j]), new Text(friend));
                }
            }
        }
    }

    static class Step2CommonFriendReducer extends Reducer<Text,Text,Text,Text>{
        @Override
        protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {

            StringBuffer stringBuffer = new StringBuffer();
            for (Text value : values) {
                stringBuffer.append(value).append(" ");
            }
            context.write(key,new Text(stringBuffer.toString()));
        }
    }
}
